package com.xujian.flink.paralle;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @author star xu
 * @date 2023/4/3 14:58
 * @Description:
 * @Slogan: 致敬大师,致敬未来的自己
 */
public class Flink01_Streaming_unbounded_wordcount {
    public static void main(String[] args) throws Exception {
        // 1. 创建流处理执行环境
        Configuration configuration = new Configuration();
        configuration.setInteger("rest.port",1000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
        env.setParallelism(1);

        // 2. 拿数据 切分  转换
        //nc -lp 1000
        DataStreamSource<String> socketTextStream = env.socketTextStream("localhost", 9999);
        socketTextStream.flatMap(new FlatMapFunction<String, String>() {
                    @Override
                    public void flatMap(String line, Collector<String> out) throws Exception {
                        String[] words = line.split(" ");
                        for (String word : words) {
                            out.collect(word);
                        }
                    }
                }).map(new MapFunction<String, Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> map(String word) throws Exception {
                        return Tuple2.of(word, 1L);
                    }
                })

                // 3. 分组  sum
                .keyBy(new KeySelector<Tuple2<String, Long>, String>() {
                    @Override
                    public String getKey(Tuple2<String, Long> tuple2) throws Exception {
                        return tuple2.f0;
                    }
                })
                // sum 操作必须在keyby之后进行
                .sum(1)
                // 输出到控制台
                .print();

        // 4. 执行
        env.execute();
    }
}